This notebook uses nevergrad to roughly fit a simple epidemiological model to the COVID-19 data for the regions in Italy then runs Markov Chain Monte Carlo sampling on the initialized model using PyMC3 to find a distribution over the model parameters and predictions.
The model tracks population states corresponding to the data categories plus exposed (a pre-infectious stage) and undetected infectious. Deaths and recoveries for undetected cases are grouped together as there is nothing to distinguish them in the data or model. The susceptible category is not included here, making this model only viable when a small fraction of the population becomes infected. Given the noise in the data, this approximation should be viable up to at least 1% of the population, perhaps has high as 10%. Beyond that, the model will overestimate infection rate.
import theano
theano.config.gcc.cxxflags = "-Wno-c++11-narrowing" # Silence a harmless warning
import pickle
import holoviews as hv
import nevergrad as ng
import numpy as np
import pandas as pd
import pymc3 as pm
import theano.tensor as tt
import xarray as xr
The model code is partially written outside this notebook for the convenience of an IDE, for better reuse, and so that Nevergrad could be run with multiprocessing for parallel execution, which requires the target function be importable. Nevergrad is used to minimize seir.simple_eir.func (dropping the module name hereafter), which is a wrapper around calc_score(*ode(*transform(**kwargs))). The innermost function, transform takes the unbounded inputs and maps them to their model variables (e.g. positive only or within $[0, 1)$). Transformed values are passed to the ode function which runs a Euler-integrated (S)EIR model. Then the ODE outputs are scored against the data.
%load_ext autoreload
%autoreload 1
%aimport seir.simple_eir
hv.notebook_extension('bokeh', logo=False)
The exponential scaling of this model makes scale difficult to determine without limitations placed on undetected cases. Fortunately, the Italy data includes total tests administered. I am not aware of any standard model for the effectiveness of testing. A naive approach would be to say that individuals are tested at random and the probability of detecting a case is proportional to the number of tests administered times the number of currently undetected infectious individuals (assuming those detected are not tested again). It may also be reasonable to assume for low testing rates that the testing is targeted with some efficiency such that infectious individuals are more likely to be tested than others by some constant. Early testing with this model suggests that is not accurate for Italy, where the level of testing is sufficiently high that this model breaks down.
Instead, to model diminishing returns in testing, we model the detection rate as proportional to the square root of (smoothed) new tests administered times infectious individuals.
ds = seir.simple_eir.ds
x = ds['tamponi']
x = x.diff(dim='data')
hv.HoloMap({
str(xi['denominazione_regione'].values): hv.Curve(xi, ('data', 'Date'), ('tamponi', 'Δ Tests'))
for xi in x
}, 'Region').overlay().options(aspect=5/3, responsive=True, legend_position='right', cmap='Category20')
ds = seir.simple_eir.ds
new_tests = ds['tamponi'].rolling({'data': 7}, center=False, min_periods=1).mean()
new_tests = new_tests.diff(dim='data') + 1
sqrt_new_tests = np.sqrt(new_tests)
hv.HoloMap({
str(xi['denominazione_regione'].values): hv.Curve(xi, 'data', 'tamponi')
for xi in sqrt_new_tests
}, 'Region').overlay().options(aspect=5/3, responsive=True, logy=True, legend_position='right')
ds = seir.simple_eir.ds
x = ds['tamponi'].rolling({'data': 7}, center=False, min_periods=1).mean()
x = x.diff(dim='data') + 1
x = ds['nuovi_positivi'] / np.sqrt(x) * ds['totale_positivi']
x.name = 'new_per_test'
hv.HoloMap({
str(xi['denominazione_regione'].values): hv.Scatter(xi, 'data', 'new_per_test')
for xi in x
}, 'Region').overlay().options(aspect=5/3, responsive=True, logy=True, legend_position='right'
).redim.range(new_per_test=(1e-4, None))
Nevergrad is run outside this notebook and temporary and the final parameter sets are saved in a pickle file. Below, they are loaded to be used for PyMC3 model initialization.
with open('seir/tmp_best.pkl', 'rb') as f:
kwargs = pickle.load(f)
print(seir.simple_eir.transform(**{k: np.average(v) for k, v in kwargs.items()}))
print([(np.min(v), np.average(v), np.max(v)) for v in seir.simple_eir.transform(**kwargs)])
print(seir.simple_eir.func(**kwargs))
exposed, infectious, detected, recovered, dead = seir.simple_eir.ode(
*seir.simple_eir.transform(**kwargs)
)
# Holoviews handles NaN better than inf
exposed[np.isinf(exposed)] = np.nan
infectious[np.isinf(infectious)] = np.nan
detected[np.isinf(detected)] = np.nan
recovered[np.isinf(recovered)] = np.nan
dead[np.isinf(dead)] = np.nan
# Plot a few regions in linear scale for an initial look at the model fit
hv.HoloMap({
(region, variable):
hv.Curve((seir.simple_eir.ds['data'].values, data[region_i, :]), 'Date', 'Count', label='exposed')
* (hv.Scatter(seir.simple_eir.ds.isel(denominazione_regione=region_i), 'data', ds_name)
if ds_name is not None else hv.Scatter(([], [])))
for region_i, region in ((seir.simple_eir.region_names.index(region), region)
for region in ['Lombardia', 'Veneto'])
for variable, data, ds_name in [
('exposed', exposed, None),
('infectious', infectious, None),
('detected', detected, 'totale_positivi'),
('recovered', recovered, 'dimessi_guariti'),
('dead', dead, 'deceduti'),
]
}, ['Region', 'Variable']).overlay('Variable').options(aspect=3, responsive=True).layout().cols(1)
# Plot the nevergrad fit for all regions on log scale
(
hv.HoloMap({
(seir.simple_eir.region_names[region_i], variable):
hv.Curve((seir.simple_eir.ds['data'].values, data[region_i, :]), 'Date', 'Count', label='exposed')
* (hv.Scatter(seir.simple_eir.ds.isel(denominazione_regione=region_i), 'data', ds_name)
if ds_name is not None else hv.Scatter(([], [])))
for region_i in range(seir.simple_eir.n_regions)
for variable, data, ds_name in [
('exposed', exposed, None),
('infectious', infectious, None),
('detected', detected, 'totale_positivi'),
('recovered', recovered, 'dimessi_guariti'),
('dead', dead, 'deceduti'),
]
}, ['Region', 'Variable']).overlay('Variable')
.options(aspect=3, responsive=True, legend_position='right', logy=True)
.redim.range(Count=(0.1, None))
.layout().cols(1)
)
# Calculate the transformed parameters and ODE trace
(initial_exposed, initial_infectious, beta, beta_detected_ratio,
sigma, theta, gamma_mu, gamma_detected, mu_detected, mu_ratio) = seir.simple_eir.transform(**kwargs)
beta = beta[:, :-1]
beta_detected_ratio = beta_detected_ratio[:, :-1]
mu_ratio = mu_ratio[:-1]
exposed, infectious, detected, recovered, dead = seir.simple_eir.ode(
initial_exposed, initial_infectious, beta, beta_detected_ratio,
sigma, theta, gamma_mu, gamma_detected, mu_detected, mu_ratio
)
assert np.all(np.isfinite(exposed))
assert np.all(np.isfinite(infectious))
assert np.all(np.isfinite(detected))
assert np.all(np.isfinite(recovered))
assert np.all(np.isfinite(dead))
assert not np.any(np.isnan(infectious))
assert not np.any(np.isnan(infectious))
assert not np.any(np.isnan(detected))
assert not np.any(np.isnan(recovered))
assert not np.any(np.isnan(dead))
x = theta
print(x.min(), x.mean(), x.max())
x = (theta * seir.simple_eir.sqrt_new_tests / 1e2)
print(x.min(), x.mean(), x.max())
x = beta_detected_ratio
print(x.min(), x.mean(), x.max())
x = beta_detected_ratio * (1 - 2e-4) + 1e-4
print(x.min(), x.mean(), x.max())
Below, we create the PyMC3 model. This model is a Stochastic differential equation (SDE). It defines the timeseries as noisy latent variables then computes the predicted next time step for every time step. The model is then constrained with an observed variable pushing the predictions and the latent values together.
n_regions = seir.simple_eir.n_regions
n_time = seir.simple_eir.n_time
SD = 3
with pm.Model() as model:
noise = pm.Normal('exposed_noise', 0, 1, shape=(n_regions, n_time))
mu = exposed
sd = np.sqrt(100 ** 2 + 0.30 ** 2 * mu * mu)
exposed = pm.Deterministic('exposed', pm.math.maximum(0, mu + sd * noise))
noise = pm.Normal('infectious_noise', 0, 1, shape=(n_regions, n_time))
mu = infectious
sd = np.sqrt(100 ** 2 + 0.30 ** 2 * mu * mu)
infectious = pm.Deterministic('infectious', pm.math.maximum(0, mu + sd * noise))
noise = pm.Normal('detected_noise', 0, 1, shape=(n_regions, n_time))
mu = seir.simple_eir.ds['totale_positivi'].values
sd = np.sqrt(20 ** 2 + 0.15 ** 2 * mu * mu)
detected = pm.Deterministic('detected', pm.math.maximum(0, mu + sd * noise))
noise = pm.Normal('recovered_noise', 0, 1, shape=(n_regions, n_time))
mu = seir.simple_eir.ds['dimessi_guariti'].values
sd = np.sqrt(40 ** 2 + 0.20 ** 2 * mu * mu)
recovered = pm.Deterministic('recovered', pm.math.maximum(0, mu + sd * noise))
noise = pm.Normal('dead_noise', 0, 1, shape=(n_regions, n_time))
mu = seir.simple_eir.ds['deceduti'].values
sd = np.sqrt(10 ** 2 + 0.10 ** 2 * mu * mu)
dead = pm.Deterministic('dead', pm.math.maximum(0, mu + sd * noise))
beta = pm.Lognormal(
name='beta',
mu=np.log(np.maximum(1e-4, beta)),
sd=SD,
shape=(n_regions, seir.simple_eir.n_eras),
)[..., seir.simple_eir.era_indices[:-1]]
beta_detected_ratio = pm.Uniform(
name='beta_detected_ratio',
lower=0,
upper=1,
testval=beta_detected_ratio * (1 - 2e-4) + 1e-4,
shape=(n_regions, seir.simple_eir.n_eras),
)[..., seir.simple_eir.era_indices[:-1]]
sigma = pm.Lognormal(
name='sigma',
# mu=np.log(0.3),
mu=np.log(sigma),
sd=SD,
)
theta = pm.math.minimum(0.95, pm.Lognormal(
name='theta',
mu=np.log(theta),
sd=SD,
shape=(n_regions, 1),
) * seir.simple_eir.sqrt_new_tests / 1e2)
gamma_mu = pm.Lognormal(
name='gamma_mu',
# mu=np.log(0.35),
mu=np.log(gamma_mu),
sd=SD,
)
gamma_detected = pm.Lognormal(
name='gamma_detected',
# mu=np.log(0.008),
mu=np.log(gamma_detected),
sd=SD,
)
mu_detected = pm.Lognormal(
name='mu_detected',
# mu=np.log(0.005),
mu=np.log(mu_detected),
sd=SD,
shape=n_regions,
)
mu_ratio = pm.math.concatenate([
np.ones(1),
pm.Uniform(
name='mu_ratio',
lower=0.5,
upper=1,
testval=mu_ratio,
shape=seir.simple_eir.n_eras-1,
)
])[..., seir.simple_eir.era_indices[:-1]]
exposed0, exposed1 = exposed[:, :-1], exposed[:, 1:]
infectious0, infectious1 = infectious[:, :-1], infectious[:, 1:]
detected0, detected1 = detected[:, :-1], detected[:, 1:]
recovered0, recovered1 = recovered[:, :-1], recovered[:, 1:]
dead0, dead1 = dead[:, :-1], dead[:, 1:]
newly_exposed = beta * infectious0 + beta * beta_detected_ratio * detected0
progressed = sigma * exposed0
detections = theta * infectious0
recoveries_detected = gamma_detected * detected0
deaths_detected = mu_ratio[None, :] * mu_detected[:, None] * detected0
exposed2 = exposed0 + newly_exposed - progressed
infectious2 = infectious0 + progressed - detections - gamma_mu * infectious0
detected2 = detected0 + detections - recoveries_detected - deaths_detected
recovered2 = recovered0 + recoveries_detected
dead2 = dead0 + deaths_detected
def sde_potential(name, y1, y2):
sd = np.sqrt(((8.0 ** 2) + (0.04 ** 2 * y2 * y2)))
mu = pm.Deterministic(f'innovation_{name}', (y1-y2)/sd)
pm.Normal(f'sde_{name}', mu=mu, sd=1, observed=0)
# pm.Normal(f'sde_{name}', mu=y1-y2, sd=sd, observed=0)
sde_potential('exposed', exposed1, exposed2)
sde_potential('infectious', infectious1, infectious2)
sde_potential('detected', detected1, detected2)
sde_potential('recovered', recovered1, recovered2)
sde_potential('dead', dead1, dead2)
Before sampling, let's do a sanity check on the model values.
# Create a function to evaluate a model variable using the latent variable initial values
defaults = {str(v): v.init_value for v in model.vars}
def f(x):
return theano.function(model.vars, x, on_unused_input='ignore')(**defaults)
# Which regions have NaNs in these variables?
{str(x): list(np.asarray(seir.simple_eir.region_names)[np.isnan(f(x)).any(axis=1)]) for x in model.deterministics[:5]}
# Which regions have NaNs in these variables?
{str(x): list(np.asarray(seir.simple_eir.region_names)[np.isnan(f(x)).any(axis=1)]) for x in model.deterministics[-5:]}
# Are there any NaNs in our parameters?
{str(x): np.isnan(f(x)).any() for x in model.deterministics[5:-5]}
# Are any logp values NaN, Inf, or really small?
{x: f(x.logpt) for x in model.vars}
To visualize the SDE model fit, the following plot shows a line segment from every point in the timeseries to the predicted point next. When that predicted point is close to the actual next point the line appears smooth. When there is variation, you see a saw-tooth pattern.
hv.HoloMap({
(region, varname): hv.Curve((
np.stack([
seir.simple_eir.ds['data'].values[None, :-1],
seir.simple_eir.ds['data'].values[None, 1:],
seir.simple_eir.ds['data'].values[None, :-1],
], axis=-1).ravel(),
np.stack([
x0[None, region_i, :],
x1[None, region_i, :],
np.nan * x0[None, region_i, :],
], axis=-1).ravel(),
), 'Date', varname)
.options(alpha=0.4)
for region_i, region in ((seir.simple_eir.region_names.index(region), region)
for region in [
'Abruzzo',
'Lombardia',
'Veneto',
])
for varname, x0, x1 in [
('Exposed', f(exposed0), f(exposed2)),
('Infectious', f(infectious0), f(infectious2)),
('Detected', f(detected0), f(detected2)),
('Recovered', f(recovered0), f(recovered2)),
('Dead', f(dead0), f(dead2)),
]
}, ['Region', 'Variable']).overlay('Region').options(aspect=3, responsive=True).layout().cols(1)
Another view of the same information, the following plot shows the "innovation" or the difference between the input next point and the SDE predicted next point scaled by the modeled SDE noise level.
hv.HoloMap({
(region, varname): hv.Curve((
seir.simple_eir.ds['data'].values[None, :-1],
x0[None, region_i, :],
), 'Date', varname)
.options(alpha=0.4)
for region_i, region in ((seir.simple_eir.region_names.index(region), region)
for region in [
'Abruzzo',
'Lombardia',
'Veneto',
])
for varname, x0 in [
('Exposed', f(model.deterministics[-5])),
('Infectious', f(model.deterministics[-4])),
('Detected', f(model.deterministics[-3])),
('Recovered', f(model.deterministics[-2])),
('Dead', f(model.deterministics[-1])),
]
}, ['Region', 'Variable']).overlay('Region').options(aspect=3, responsive=True).layout().cols(1)
with model:
trace = pm.sample(1_000, tune=1_000, target_accept=0.99, compute_convergence_checks=False)
pm.pairplot(trace, var_names=[
# 'beta_detected_ratio',
'sigma',
#'theta',
'gamma_mu',
'gamma_detected',
#'mu_detected',
], divergences=True)
k = 64
i = np.random.randint(len(trace) * trace.nchains, size=k)
t = np.repeat(seir.simple_eir.ds['data'].values[None, :-1], k, 0).ravel()
hv.HoloMap({
(seir.simple_eir.region_names[region_i], group):
hv.Scatter((t, trace[f'innovation_{group}'][i, region_i, :].ravel()), 'Date', 'Deviation from SEIR model')
.options(alpha=0.1, color=color)
for region_i, color, linecolor in [
(7, 'blue', 'white'),
(16, 'red', 'black'),
(0, 'green', 'yellow'),
]
for group in [
'exposed',
'infectious',
'detected',
'recovered',
'dead',
]
}, ['Region', 'Variable']).overlay('Region').options(aspect=2, responsive=True).layout().cols(1)
hv.HoloMap({
seir.simple_eir.region_names[region_i]: (
hv.Overlay([
hv.Curve((
seir.simple_eir.ds['data'],
trace['detected'][k, region_i, :] +
trace['recovered'][k, region_i, :] +
trace['dead'][k, region_i, :]
), 'Date', 'Total Detected Cases')
.options(alpha=0.1, color=color)
for k in i
])
*
hv.Scatter((seir.simple_eir.ds['data'], seir.simple_eir.ds['totale_casi'][region_i]))
.options(color=color, line_color=linecolor, alpha=0.2)
)
for region_i, color, linecolor in [
(7, 'blue', 'white'),
(16, 'red', 'black'),
(0, 'green', 'yellow'),
]
}, ['Region']).overlay().options(aspect=5/3, responsive=True)
hv.HoloMap({
(ylabel, seir.simple_eir.region_names[region_i]): (
hv.Overlay([
hv.Curve((seir.simple_eir.ds['data'], trace[tracename][k, region_i, :]), 'Date', ylabel)
.options(alpha=0.1, color=color)
for k in i
])
*
hv.Scatter((seir.simple_eir.ds['data'], seir.simple_eir.ds[dsname][region_i]))
.options(color=color, line_color=linecolor, alpha=0.2)
)
for tracename, ylabel, dsname in [
('detected', 'Detected Cases', 'totale_positivi'),
('recovered', 'Recovered Cases', 'dimessi_guariti'),
('dead', 'Deaths', 'deceduti'),
]
for region_i, color, linecolor in [
(7, 'blue', 'white'),
(16, 'red', 'black'),
(0, 'green', 'yellow'),
]
}, ['Variable', 'Region']).overlay('Region').options(aspect=3, responsive=True).layout().cols(1)
hv.HoloMap({
(ylabel, seir.simple_eir.region_names[region_i]): (
hv.Overlay([
hv.Curve((seir.simple_eir.ds['data'], trace[tracename][k, region_i, :]), 'Date', ylabel)
.options(alpha=0.1, color=color)
for k in i
])
)
for tracename, ylabel in [
('exposed', 'Exposed'),
('infectious', 'Infectious'),
]
for region_i, color, linecolor in [
(7, 'blue', 'white'),
(16, 'red', 'black'),
(0, 'green', 'yellow'),
]
}, ['Variable', 'Region']).overlay('Region').options(aspect=3, responsive=True).layout().cols(1)